# Copyright © 2025 Pathway

from __future__ import annotations

from typing import Iterable, Literal

from pathway.internals import api, datasink, datasource
from pathway.internals.config import _check_entitlements
from pathway.internals.expression import ColumnReference
from pathway.internals.runtime_type_check import check_arg_types
from pathway.internals.schema import Schema
from pathway.internals.table import Table
from pathway.internals.table_io import table_from_datasource
from pathway.internals.trace import trace_user_frame
from pathway.io._utils import (
    MessageQueueOutputFormat,
    _get_unique_name,
    check_raw_and_plaintext_only_kwargs_for_message_queues,
    construct_schema_and_data_format,
)


@check_arg_types
@trace_user_frame
def read(
    stream_name: str,
    *,
    schema: type[Schema] | None = None,
    format: Literal["plaintext", "raw", "json"] = "raw",
    autocommit_duration_ms: int = 1500,
    json_field_paths: dict[str, str] | None = None,
    name: str | None = None,
    max_backlog_size: int | None = None,
    debug_data=None,
    **kwargs,
) -> Table:
    """
    Reads a table from an
    `AWS Kinesis stream <https://docs.aws.amazon.com/streams/latest/dev/introduction.html>`_.
    The connection settings are retrieved from the environment.

    There are three supported formats: ``"plaintext"``, ``"raw"``, and ``"json"``.

    For the ``"raw"`` format, the payload is read as raw bytes and added directly to the
    table. In the ``"plaintext"`` format, the payload decoded from UTF-8 and stored as plain text.
    In both cases, the table will have an autogenerated primary key and a single ``"data"``
    column representing the payload.

    If you select the ``"json"`` format, the connector parses the message payload as JSON
    and creates table columns based on the schema provided in the ``schema`` parameter. The
    column values come from the corresponding JSON fields.

    Args:
        stream_name: The name of the data stream to be read.
        schema: The table schema, used only when the format is set to ``"json"``.
        format: The input data format, which can be ``"raw"``, ``"plaintext"``, or
            ``"json"``.
        autocommit_duration_ms: The time interval (in milliseconds) between commits.
            After this time, the updates received by the connector are committed and
            added to Pathway's computation graph. Please note that it has to be a not-None
            value in this connector.
        json_field_paths: For the ``"json"`` format, this allows mapping field names to
            paths within the JSON structure. Use the format ``<field_name>: <path>``
            where the path follows the
            `JSON Pointer (RFC 6901) <https://www.rfc-editor.org/rfc/rfc6901>`_.
        name: A unique name for the connector. If provided, this name will be used in
            logs and monitoring dashboards. Additionally, if persistence is enabled, it
            will be used as the name for the snapshot that stores the connector's progress.
        max_backlog_size: Limit on the number of entries read from the input source and kept
            in processing at any moment. Reading pauses when the limit is reached and resumes
            as processing of some entries completes. Useful with large sources that
            emit an initial burst of data to avoid memory spikes.
        debug_data: Static data replacing original one when debug mode is active.

    Returns:
        Table: The table read.

    Example:

    To test the connector locally, you need a way to run Kinesis on your machine.
    You can use the Docker image
    `instructure/kinesalite <https://hub.docker.com/r/instructure/kinesalite/>`_ to spawn
    it.

    Start the container as follows:

    .. code-block:: bash

        docker pull instructure/kinesalite:latest
        docker run -p 4567:4567 --name kinesis-local instructure/kinesalite:latest

    The first command pulls the Kinesis image from Docker Hub.
    The second command starts a container and exposes port ``4567``, the standard port
    used for the connection.

    Since Kinesis now runs locally and the settings are retrieved from the environment,
    configure the required variables so the connector can reach the local instance:

    .. code-block:: bash

        export AWS_ENDPOINT_URL=http://localhost:4567
        export AWS_REGION=us-east-1

    Now you can start testing. First, connect to the local Kinesis instance and create
    a client with `boto3 <https://pypi.org/project/boto3/>`_:

    >>> import boto3
    >>> client = boto3.client(  # doctest: +SKIP
    ...     "kinesis",
    ...     region_name="us-east-1",
    ...     endpoint_url="http://localhost:4567",
    ... )

    Use the created client to create a new stream, for example ``"testing"``:

    >>> client.create_stream(StreamName="testing", ShardCount=1)  # doctest: +SKIP

    The stream is created asynchronously, so you need to wait until its status becomes
    ``"ACTIVE"``. To check the status you can use the ``describe_stream`` method.

    Once the stream is active, send a few records with ``put_record``. Note that the
    payload must be bytes:

    >>> client.put_record(  # doctest: +SKIP
    ...     StreamName="testing",
    ...     PartitionKey="123",
    ...     Data="Hello, world!".encode("utf-8"),
    ... )

    Finally, you have a stream with data. You can now read it using the Pathway connector:

    >>> import pathway as pw
    >>> table = pw.io.kinesis.read("testing", format="plaintext")

    Here you first import Pathway, then read the Kinesis stream.
    The ``"plaintext"`` format decodes UTF-8 so the text ``"Hello, world!"`` can be
    viewed as plain text.

    Finally, write the row to a file using a Pathway output connector, for example JSONLines:

    >>> pw.io.jsonlines.write(table, "output.jsonl")

    Do not forget to call ``pw.run()`` to start the pipeline.
    Once running, the connector continuously monitors the Kinesis stream and writes both
    existing and newly arriving messages to ``output.jsonl``.
    """

    _check_entitlements("kinesis")
    data_storage = api.DataStorage(
        storage_type="kinesis",
        topic=stream_name,
        mode=api.ConnectorMode.STREAMING,
    )
    schema, data_format = construct_schema_and_data_format(
        "binary" if format == "raw" else format,
        schema=schema,
        csv_settings=None,
        json_field_paths=json_field_paths,
    )
    data_source_options = datasource.DataSourceOptions(
        commit_duration_ms=autocommit_duration_ms,
        unique_name=_get_unique_name(name, kwargs),
        max_backlog_size=max_backlog_size,
    )
    return table_from_datasource(
        datasource.GenericDataSource(
            datastorage=data_storage,
            dataformat=data_format,
            data_source_options=data_source_options,
            schema=schema,
            datasource_name="kinesis",
        ),
        debug_datasource=datasource.debug_datasource(debug_data),
    )


@check_raw_and_plaintext_only_kwargs_for_message_queues
@check_arg_types
@trace_user_frame
def write(
    table: Table,
    stream_name: str | ColumnReference,
    *,
    format: Literal["raw", "plaintext", "json"] = "json",
    partition_key: ColumnReference | None = None,
    data: ColumnReference | None = None,
    name: str | None = None,
    sort_by: Iterable[ColumnReference] | None = None,
) -> None:
    """
    Streams ``table`` into an
    `AWS Kinesis stream <https://docs.aws.amazon.com/streams/latest/dev/introduction.html>`_.
    The connection settings are retrieved from the environment.

    Args:
        table: The table to write.
        stream_name: The Kinesis stream where data will be written. This can be a
            specific stream name or a reference to a column whose values will be used
            as the stream for each message. If using a column reference, the column must
            contain string values.
        format: Format in which the data is put into Kinesis. Currently ``"json"``,
            ``"plaintext"``, and ``"raw"`` are supported. If the ``"raw"`` format is selected,
            ``table`` must either contain exactly one binary column that will be dumped as it is into the
            Kinesis record, or the reference to the target binary column must be specified explicitly
            in the ``data`` parameter. Similarly, if ``"plaintext"`` is chosen, the table must consist
            of a single column of the string type, or the reference to the target string column
            must be specified explicitly in the ``data`` parameter.
        partition_key: Reference to the column used as the partition key in the produced message.
            It can have any data type, because if it is not a string, Pathway will obtain its
            string representation and use that value. Note that the maximum length of a partition key
            in Kinesis is 256 bytes. If the key is not specified, internal row key will be used.
        data: Reference to the column that should be used as data in the produced message in
            ``"plaintext"`` or ``"raw"`` format. It can be deduced automatically if the table
            has exactly one column. Otherwise it must be specified directly. It also has to be
            explicitly specified, if ``partition_key`` is set. The type of the column must
            correspond to the format used: ``str`` for the ``"plaintext"`` format and ``binary``
            for the ``"raw"`` format. Note that the maximum length of one message payload in
            Kinesis is 1 MiB.
        name: A unique name for the connector. If provided, this name will be used in
            logs and monitoring dashboards.
        sort_by: If specified, the output will be sorted in ascending order based on the
            values of the given columns within each minibatch. When multiple columns are provided,
            the corresponding value tuples will be compared lexicographically.

    Returns:
        None

    Example:

    The local test setup is the same as for the case of the input connector: you need
    a way to run Kinesis on your machine. You can use the Docker image
    `instructure/kinesalite <https://hub.docker.com/r/instructure/kinesalite/>`_ to spawn
    it.

    Start the container as follows:

    .. code-block:: bash

        docker pull instructure/kinesalite:latest
        docker run -p 4567:4567 --name kinesis-local instructure/kinesalite:latest

    The first command pulls the Kinesis image from Docker Hub.
    The second command starts a container and exposes port ``4567``, the standard port
    used for the connection.

    Since Kinesis now runs locally and the settings are retrieved from the environment,
    configure the required variables so the connector can reach the local instance:

    .. code-block:: bash

        export AWS_ENDPOINT_URL=http://localhost:4567
        export AWS_REGION=us-east-1

    Now you can start testing. First, connect to the local Kinesis instance and create
    a client with `boto3 <https://pypi.org/project/boto3/>`_:

    >>> import boto3
    >>> client = boto3.client(  # doctest: +SKIP
    ...     "kinesis",
    ...     region_name="us-east-1",
    ...     endpoint_url="http://localhost:4567",
    ... )

    Use the created client to create a new stream, for example ``"testing"``.
    For easier testing, create the stream with a single shard as follows:

    >>> client.create_stream(StreamName="testing", ShardCount=1)  # doctest: +SKIP

    The stream is now ready and you can send messages to it with this connector.
    First, import Pathway and create a static table with sample data.
    The code would look like this:

    >>> import pathway as pw
    >>> table = pw.debug.table_from_markdown(
    ...     '''
    ...     | key | value
    ...  1  |   1 | one
    ...  2  |   2 | two
    ...  3  |   3 | three
    ... '''
    ... )

    The table is created; the next step is to write it to the Kinesis stream.
    You can do so as follows:

    >>> pw.io.kinesis.write(
    ...     table,
    ...     stream_name="testing",
    ...     format="plaintext",
    ...     partition_key=table.key,
    ...     data=table.value,
    ... )

    This command streams the table as follows: the column ``key`` is used as the partition key.
    Because it is an integer, the connector converts it to a string. The column ``value``
    is sent as the payload. Remember to call ``pw.run`` to start the pipeline.

    After the execution finishes, you can read the stream with the boto3 library. Start by
    listing all shards:

    >>> shards_response = client.list_shards(StreamName="testing")  # doctest: +SKIP

    Because the stream was created with one shard, the ``"Shards"`` list in the response
    contains only one item. You can retrieve its ID as shown below:

    >>> shard_id = shards_response["Shards"][0]["ShardId"]  # doctest: +SKIP

    To fetch messages, first obtain an iterator for this shard as follows:

    >>> iterator_response = client.get_shard_iterator(  # doctest: +SKIP
    ...     StreamName="testing",
    ...     ShardId=shard_id,
    ...     ShardIteratorType="TRIM_HORIZON",
    ... )
    >>> shard_iterator = iterator_response["ShardIterator"]  # doctest: +SKIP

    The parameter ``TRIM_HORIZON`` instructs Kinesis to read the shard from the beginning.
    If you rerun this example, the stream may contain more messages than expected because
    the connector does not clear streams. Also note that a shard iterator is valid for
    five minutes, so it must be refreshed if needed.

    You can now request the records. Although the API is paginated, three test records can be
    retrieved in a single call as follows:

    >>> records_response = client.get_records(ShardIterator=shard_iterator, Limit=100)  # doctest: +SKIP

    To verify the contents, you can print the received messages with this code:

    >>> for r in records_response["Records"]:  # doctest: +SKIP
    ...    print(f"Partition key: {r['PartitionKey']}; Value: {r['Data']}")
    Partition key: 1; Value: b'one'
    Partition key: 2; Value: b'two'
    Partition key: 3; Value: b'three'

    You could also read these messages using the Pathway Kinesis input connector. Then,
    the reading code would look like this:

    >>> reread_table = pw.io.kinesis.read("testing", format="plaintext")

    This table can then be written to a file or processed further.

    If the table contains more than two columns and you want to keep all data, use the
    JSON format for serialization:

    >>> pw.io.kinesis.write(
    ...     table,
    ...     stream_name="testing",
    ...     format="json",
    ... )

    If you need to split the table output across multiple streams, the column containing
    the stream name can be provided as the ``stream_name`` parameter.
    For example, create a table with an additional column:

    >>> table = pw.debug.table_from_markdown(
    ...     '''
    ...     | key | value | stream
    ...  1  |   1 | one   | testing
    ...  2  |   2 | two   | other
    ...  3  |   3 | three | testing
    ... '''
    ... )

    And then, write this table to multiple streams depending on the ``stream`` column as follows:

    >>> pw.io.kinesis.write(
    ...     table,
    ...     stream_name=table.stream,
    ...     format="json",
    ... )

    As a result, two messages with ``"value"`` equal to ``"one"`` and ``"three"`` are
    added to the ``"testing"`` stream. One message with ``"value"`` equal to ``"two"`` is
    added to the ``"other"`` stream, which must be created beforehand if you are testing
    this part locally.
    """

    output_format = MessageQueueOutputFormat.construct(
        table,
        format=format,
        key=partition_key,
        value=data,
        topic_name=stream_name if isinstance(stream_name, ColumnReference) else None,
        allowed_key_types=None,
    )
    table = output_format.table

    data_storage = api.DataStorage(
        storage_type="kinesis",
        topic=stream_name if isinstance(stream_name, str) else None,
        topic_name_index=output_format.topic_name_index,
        key_field_index=output_format.key_field_index,
    )

    table.to(
        datasink.GenericDataSink(
            data_storage,
            output_format.data_format,
            datasink_name="kinesis",
            unique_name=name,
            sort_by=sort_by,
        )
    )
